defmodule BullMQ.RedisConnection do
  @moduledoc """
  Redis connection management for BullMQ.

  This module provides a supervised Redis connection pool using NimblePool.
  It handles connection lifecycle, reconnection, and provides a clean API
  for executing Redis commands.

  ## Usage

  Add to your supervision tree:

      children = [
        {BullMQ.RedisConnection,
          name: :bullmq_redis,
          url: "redis://localhost:6379",
          pool_size: 10}
      ]

  Then use it with queues and workers:

      BullMQ.Queue.add("my_queue", "job", %{}, connection: :bullmq_redis)

  ## Options

    * `:name` - The name to register the connection pool (required)
    * `:url` - Redis URL (e.g., "redis://localhost:6379")
    * `:host` - Redis host (default: "localhost")
    * `:port` - Redis port (default: 6379)
    * `:password` - Redis password (optional)
    * `:database` - Redis database number (default: 0)
    * `:pool_size` - Number of connections in the pool (default: 10)
    * `:ssl` - Enable SSL (default: false)
    * `:socket_opts` - Additional socket options
    * `:timeout` - Connection timeout in ms (default: 5000)
  """

  use Supervisor

  require Logger

  @default_pool_size 10
  @default_timeout 5000

  @type connection :: atom() | pid()
  @type command :: [binary() | integer()]
  @type pipeline :: [command()]

  @doc """
  Starts the Redis connection pool.
  """
  @spec start_link(keyword()) :: Supervisor.on_start()
  def start_link(opts) do
    name = Keyword.fetch!(opts, :name)
    Supervisor.start_link(__MODULE__, opts, name: supervisor_name(name))
  end

  @impl true
  def init(opts) do
    name = Keyword.fetch!(opts, :name)
    pool_size = Keyword.get(opts, :pool_size, @default_pool_size)
    redis_opts = build_redis_opts(opts)

    children = [
      # Main connection pool for commands
      {NimblePool,
       worker: {__MODULE__.Worker, redis_opts}, pool_size: pool_size, name: pool_name(name)},

      # Registry for tracking blocking connections
      {Registry, keys: :unique, name: registry_name(name)}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

  @doc """
  Executes a Redis command.

  ## Examples

      BullMQ.RedisConnection.command(:my_redis, ["SET", "key", "value"])
      #=> {:ok, "OK"}

      BullMQ.RedisConnection.command(:my_redis, ["GET", "key"])
      #=> {:ok, "value"}
  """
  @spec command(connection(), command(), keyword()) :: {:ok, term()} | {:error, term()}
  def command(conn, command, opts \\ []) do
    timeout = Keyword.get(opts, :timeout, @default_timeout)

    NimblePool.checkout!(
      pool_name(conn),
      :checkout,
      fn _from, redix ->
        result = Redix.command(redix, command, timeout: timeout)
        {result, redix}
      end,
      timeout
    )
  rescue
    e -> {:error, e}
  end

  @doc """
  Executes a Redis command, raising on error.
  """
  @spec command!(connection(), command(), keyword()) :: term()
  def command!(conn, command, opts \\ []) do
    case command(conn, command, opts) do
      {:ok, result} -> result
      {:error, error} -> raise error
    end
  end

  @doc """
  Executes a pipeline of Redis commands.

  ## Examples

      BullMQ.RedisConnection.pipeline(:my_redis, [
        ["SET", "key1", "value1"],
        ["SET", "key2", "value2"],
        ["GET", "key1"]
      ])
      #=> {:ok, ["OK", "OK", "value1"]}
  """
  @spec pipeline(connection(), pipeline(), keyword()) :: {:ok, [term()]} | {:error, term()}
  def pipeline(conn, commands, opts \\ []) do
    timeout = Keyword.get(opts, :timeout, @default_timeout)

    NimblePool.checkout!(
      pool_name(conn),
      :checkout,
      fn _from, redix ->
        result = Redix.pipeline(redix, commands, timeout: timeout)
        {result, redix}
      end,
      timeout
    )
  rescue
    e -> {:error, e}
  end

  @doc """
  Executes a pipeline, raising on error.
  """
  @spec pipeline!(connection(), pipeline(), keyword()) :: [term()]
  def pipeline!(conn, commands, opts \\ []) do
    case pipeline(conn, commands, opts) do
      {:ok, results} -> results
      {:error, error} -> raise error
    end
  end

  @doc """
  Executes multiple commands in a Redis transaction (MULTI/EXEC).

  All commands are executed atomically - either all succeed or none do.
  Returns `{:ok, results}` where results is a list of command results,
  or `{:error, reason}` if the transaction fails.

  ## Examples

      BullMQ.RedisConnection.transaction(:my_redis, [
        ["SET", "key1", "value1"],
        ["SET", "key2", "value2"],
        ["GET", "key1"]
      ])
      #=> {:ok, ["OK", "OK", "value1"]}
  """
  @spec transaction(connection(), pipeline(), keyword()) :: {:ok, [term()]} | {:error, term()}
  def transaction(conn, commands, opts \\ []) do
    timeout = Keyword.get(opts, :timeout, @default_timeout)

    # Wrap commands in MULTI/EXEC
    transaction_commands = [["MULTI"]] ++ commands ++ [["EXEC"]]

    NimblePool.checkout!(
      pool_name(conn),
      :checkout,
      fn _from, redix ->
        result = Redix.pipeline(redix, transaction_commands, timeout: timeout)
        {result, redix}
      end,
      timeout
    )
    |> case do
      {:ok, results} ->
        # Results are: ["OK" (MULTI), "QUEUED", "QUEUED", ..., [actual_results] (EXEC)]
        # The last element is the EXEC result which contains all the actual results
        case List.last(results) do
          nil ->
            # Transaction was aborted (e.g., WATCH failed)
            {:error, :transaction_aborted}

          exec_results when is_list(exec_results) ->
            # Check for errors in results
            errors = Enum.filter(exec_results, &match?(%Redix.Error{}, &1))

            if Enum.empty?(errors) do
              {:ok, exec_results}
            else
              {:error, {:transaction_errors, exec_results}}
            end

          %Redix.Error{} = error ->
            {:error, error}
        end

      {:error, reason} ->
        {:error, reason}
    end
  rescue
    e -> {:error, e}
  end

  @doc """
  Executes a Lua script.

  ## Examples

      BullMQ.RedisConnection.eval(:my_redis, "return KEYS[1]", ["mykey"], [])
      #=> {:ok, "mykey"}
  """
  @spec eval(connection(), String.t(), [String.t()], [term()], keyword()) ::
          {:ok, term()} | {:error, term()}
  def eval(conn, script, keys, args, opts \\ []) do
    timeout = Keyword.get(opts, :timeout, @default_timeout)
    num_keys = length(keys)
    command = ["EVAL", script, num_keys | keys ++ stringify_args(args)]

    NimblePool.checkout!(
      pool_name(conn),
      :checkout,
      fn _from, redix ->
        result = Redix.command(redix, command, timeout: timeout)
        {result, redix}
      end,
      timeout
    )
  rescue
    e -> {:error, e}
  end

  @doc """
  Executes a Lua script using EVALSHA (cached script).
  Falls back to EVAL if the script is not cached.
  """
  @spec evalsha(connection(), String.t(), String.t(), [String.t()], [term()], keyword()) ::
          {:ok, term()} | {:error, term()}
  def evalsha(conn, sha, script, keys, args, opts \\ []) do
    timeout = Keyword.get(opts, :timeout, @default_timeout)
    num_keys = length(keys)
    command = ["EVALSHA", sha, num_keys | keys ++ stringify_args(args)]

    NimblePool.checkout!(
      pool_name(conn),
      :checkout,
      fn _from, redix ->
        case Redix.command(redix, command, timeout: timeout) do
          {:error, %Redix.Error{message: "NOSCRIPT" <> _}} ->
            # Script not cached, use EVAL
            eval_command = ["EVAL", script, num_keys | keys ++ stringify_args(args)]
            result = Redix.command(redix, eval_command, timeout: timeout)
            {result, redix}

          result ->
            {result, redix}
        end
      end,
      timeout
    )
  rescue
    e -> {:error, e}
  end

  @doc """
  Creates a dedicated blocking connection for operations like BRPOPLPUSH or BZPOPMIN.

  Returns a connection that can be used for blocking operations without
  affecting the main pool.
  """
  @spec blocking_connection(connection(), keyword()) :: {:ok, pid()} | {:error, term()}
  def blocking_connection(conn, opts \\ []) do
    redis_opts = get_redis_opts(conn)

    case Redix.start_link(redis_opts ++ opts) do
      {:ok, pid} ->
        # Register the blocking connection
        Registry.register(registry_name(conn), {:blocking, self()}, pid)
        {:ok, pid}

      error ->
        error
    end
  end

  @doc """
  Closes a blocking connection.
  """
  @spec close_blocking(connection(), pid()) :: :ok
  def close_blocking(conn, pid) do
    Registry.unregister(registry_name(conn), {:blocking, self()})

    # Safely stop the Redix connection
    if Process.alive?(pid) do
      try do
        Redix.stop(pid)
      catch
        :exit, _ -> :ok
      end
    end

    :ok
  rescue
    _ -> :ok
  end

  @doc """
  Disconnects a blocking connection for reconnection.
  """
  @spec disconnect_blocking(pid()) :: :ok
  def disconnect_blocking(pid) do
    # Send a command to interrupt blocking
    try do
      Redix.command(pid, ["CLIENT", "UNBLOCK", "self"], timeout: 100)
    rescue
      _ -> :ok
    end

    :ok
  end

  @doc """
  Gets the underlying redis options for creating new connections.
  """
  @spec get_redis_opts(connection()) :: keyword()
  def get_redis_opts(conn) do
    # Retrieve opts from supervisor state
    case Process.whereis(supervisor_name(conn)) do
      nil ->
        []

      pid ->
        case :sys.get_state(pid) do
          {_, _, _, _, children} when is_list(children) ->
            # Find the pool child and extract opts
            []

          _ ->
            []
        end
    end
  end

  # Private helpers

  defp build_redis_opts(opts) do
    base_opts =
      cond do
        Keyword.has_key?(opts, :url) ->
          parse_redis_url(Keyword.get(opts, :url))

        true ->
          [
            host: Keyword.get(opts, :host, "localhost"),
            port: Keyword.get(opts, :port, 6379),
            password: Keyword.get(opts, :password),
            database: Keyword.get(opts, :database, 0)
          ]
      end

    base_opts
    |> Keyword.merge(
      ssl: Keyword.get(opts, :ssl, false),
      socket_opts: Keyword.get(opts, :socket_opts, []),
      timeout: Keyword.get(opts, :timeout, @default_timeout),
      sync_connect: false
    )
    |> Keyword.reject(fn {_k, v} -> is_nil(v) end)
  end

  defp parse_redis_url(url) when is_binary(url) do
    uri = URI.parse(url)

    # Parse host and port
    host = uri.host || "localhost"
    port = uri.port || 6379

    # Parse password from userinfo (format: user:password or just password)
    password =
      case uri.userinfo do
        nil ->
          nil

        userinfo ->
          case String.split(userinfo, ":", parts: 2) do
            [_, pass] -> pass
            [pass] -> pass
          end
      end

    # Parse database from path (e.g., /0 for database 0)
    database =
      case uri.path do
        nil ->
          0

        "" ->
          0

        "/" ->
          0

        "/" <> db_str ->
          case Integer.parse(db_str) do
            {db, _} -> db
            :error -> 0
          end
      end

    [host: host, port: port, password: password, database: database]
  end

  defp parse_redis_url(_), do: [host: "localhost", port: 6379]

  defp stringify_args(args) do
    Enum.map(args, fn
      arg when is_binary(arg) -> arg
      arg when is_integer(arg) -> Integer.to_string(arg)
      arg when is_float(arg) -> Float.to_string(arg)
      arg when is_atom(arg) -> Atom.to_string(arg)
      arg -> inspect(arg)
    end)
  end

  defp pool_name(name) when is_atom(name), do: :"#{name}_pool"
  defp pool_name(name), do: :"#{name}_pool"

  defp supervisor_name(name) when is_atom(name), do: :"#{name}_sup"
  defp supervisor_name(name), do: :"#{name}_sup"

  defp registry_name(name) when is_atom(name), do: :"#{name}_registry"
  defp registry_name(name), do: :"#{name}_registry"

  # NimblePool worker implementation
  defmodule Worker do
    @moduledoc false

    @behaviour NimblePool

    @impl NimblePool
    def init_worker(redis_opts) do
      {:ok, pid} = Redix.start_link(redis_opts)
      {:ok, pid, redis_opts}
    end

    @impl NimblePool
    def handle_checkout(:checkout, _from, pid, pool_state) do
      {:ok, pid, pid, pool_state}
    end

    @impl NimblePool
    def handle_checkin(pid, _from, _old_pid, pool_state) do
      {:ok, pid, pool_state}
    end

    @impl NimblePool
    def terminate_worker(_reason, pid, pool_state) do
      Redix.stop(pid)
      {:ok, pool_state}
    rescue
      _ -> {:ok, pool_state}
    end

    @impl NimblePool
    def handle_ping(_pid, pool_state) do
      {:ok, pool_state}
    end
  end
end
